package com.kafka.interceptor;

import java.util.Map;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

/**
 * 数一下消息发送成败数量
 * @author 苗建峰
 *
 */

public class CountInterceptor implements ProducerInterceptor<String, String>{

	private long success = 0;
	private long fail = 0;

	@Override
	public void configure(Map<String, ?> configs) {
		// TODO Auto-generated method stub

	}

	@Override
	public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {

		return record;

	}

	/**
	 * 收到ack后做计数
	 */
	@Override
	public void onAcknowledgement(RecordMetadata metadata, Exception exception) {

		if (exception == null) {

			success++;

		} else {

			fail++;

		}

	}

	/**
	 * 发送完成后输出结果
	 */
	@Override
	public void close() {

		System.out.println(String.format("成功了%d条", success));
		System.out.println(String.format("失败了%d条", fail));

	}

}
